/*-
 * Public Domain 2014-2019 MongoDB, Inc.
 * Public Domain 2008-2014 WiredTiger, Inc.
 *
 * This is free and unencumbered software released into the public domain.
 *
 * Anyone is free to copy, modify, publish, use, compile, sell, or
 * distribute this software, either in source code form or as a compiled
 * binary, for any purpose, commercial or non-commercial, and by any
 * means.
 *
 * In jurisdictions that recognize copyright laws, the author or authors
 * of this software dedicate any and all copyright interest in the
 * software to the public domain. We make this dedication for the benefit
 * of the public at large and to the detriment of our heirs and
 * successors. We intend this dedication to be an overt act of
 * relinquishment in perpetuity of all present and future rights to this
 * software under copyright law.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
 * IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
 * OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
 * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
 * OTHER DEALINGS IN THE SOFTWARE.
 */
#include "test_util.h"

/*
 * JIRA ticket reference: WT-2323
 *
 * Test case description: We create two kinds of threads that race: One kind
 * is populating/updating records in a table with a few indices, the other
 * is reading from a join of that table. The hope in constructing this test
 * was to have the updates interleaved between reads of multiple indices by
 * the join, yielding an inconsistent view of the data.  In the main table,
 * we insert account records, with a positive or negative balance.  The
 * negative balance accounts always have a flag set to non-zero, positive
 * balances have the flag set to zero.  The join we do is:
 *
 *   select (*) from account where account.postal_code = '54321' and
 *                      account.balance < 0 and account.flags == 0
 *
 * which should always yield no results.
 *
 * Failure mode: This test never actually failed with any combination of
 * parameters, with N_INSERT up to 50000000.  It seems that a snapshot is
 * implicitly allocated in the session used by a join by the set_key calls
 * that occur before the first 'next' of the join cursor is done.  Despite
 * that, the test seems interesting enough to keep around, with the number
 * of inserts set low as a default.
 */

#define N_RECORDS 10000
#define N_INSERT 500000
#define N_INSERT_THREAD 2
#define N_JOIN_THREAD 2
#define S64 "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789::"
#define S1024 (S64 S64 S64 S64 S64 S64 S64 S64 S64 S64 S64 S64 S64 S64 S64 S64)

typedef struct {
    char posturi[256];
    char baluri[256];
    char flaguri[256];
    char joinuri[256];
    bool bloom;
    bool remove;
} SHARED_OPTS;

typedef struct {
    TEST_OPTS *testopts;
    SHARED_OPTS *sharedopts;
    int threadnum;
    int nthread;
    int done;
    int joins;
    int removes;
    int inserts;
    int notfounds;
    int rollbacks;
} THREAD_ARGS;

static void *thread_insert(void *);
static void *thread_join(void *);
static void test_join(TEST_OPTS *, SHARED_OPTS *, bool, bool);

int
main(int argc, char *argv[])
{
    SHARED_OPTS *sharedopts, _sharedopts;
    TEST_OPTS *opts, _opts;
    const char *tablename;

    /* Bypass this test for valgrind */
    if (testutil_is_flag_set("TESTUTIL_BYPASS_VALGRIND"))
        return (EXIT_SUCCESS);

    opts = &_opts;
    sharedopts = &_sharedopts;
    memset(opts, 0, sizeof(*opts));
    memset(sharedopts, 0, sizeof(*sharedopts));

    testutil_check(testutil_parse_opts(argc, argv, opts));
    testutil_make_work_dir(opts->home);

    tablename = strchr(opts->uri, ':');
    testutil_assert(tablename != NULL);
    tablename++;
    testutil_check(
      __wt_snprintf(sharedopts->posturi, sizeof(sharedopts->posturi), "index:%s:post", tablename));
    testutil_check(
      __wt_snprintf(sharedopts->baluri, sizeof(sharedopts->baluri), "index:%s:bal", tablename));
    testutil_check(
      __wt_snprintf(sharedopts->flaguri, sizeof(sharedopts->flaguri), "index:%s:flag", tablename));
    testutil_check(
      __wt_snprintf(sharedopts->joinuri, sizeof(sharedopts->joinuri), "join:%s", opts->uri));

    testutil_check(wiredtiger_open(opts->home, NULL, "create,cache_size=1G", &opts->conn));

    test_join(opts, sharedopts, true, true);
    test_join(opts, sharedopts, true, false);
    test_join(opts, sharedopts, false, true);
    test_join(opts, sharedopts, false, false);

    testutil_cleanup(opts);

    return (0);
}

static void
test_join(TEST_OPTS *opts, SHARED_OPTS *sharedopts, bool bloom, bool sometimes_remove)
{
    THREAD_ARGS insert_args[N_INSERT_THREAD], join_args[N_JOIN_THREAD];
    WT_CURSOR *maincur;
    WT_SESSION *session;
    pthread_t insert_tid[N_INSERT_THREAD], join_tid[N_JOIN_THREAD];
    int i;

    memset(insert_args, 0, sizeof(insert_args));
    memset(join_args, 0, sizeof(join_args));

    sharedopts->bloom = bloom;
    sharedopts->remove = sometimes_remove;

    fprintf(stderr, "Running with bloom=%d, remove=%d\n", (int)bloom, (int)sometimes_remove);

    testutil_check(opts->conn->open_session(opts->conn, NULL, NULL, &session));

    /*
     * Note: id is repeated as id2. This makes it easier to identify the primary key in dumps of the
     * index files.
     */
    testutil_check(session->create(session, opts->uri,
      "key_format=i,value_format=iiSii,"
      "columns=(id,post,bal,extra,flag,id2)"));

    testutil_check(session->create(session, sharedopts->posturi, "columns=(post)"));
    testutil_check(session->create(session, sharedopts->baluri, "columns=(bal)"));
    testutil_check(session->create(session, sharedopts->flaguri, "columns=(flag)"));

    /*
     * Insert a single record with all items we need to call search() on, this makes our join logic
     * easier.
     */
    testutil_check(session->open_cursor(session, opts->uri, NULL, NULL, &maincur));
    maincur->set_key(maincur, N_RECORDS);
    maincur->set_value(maincur, 54321, 0, "", 0, N_RECORDS);
    testutil_check(maincur->insert(maincur));
    testutil_check(maincur->close(maincur));

    for (i = 0; i < N_INSERT_THREAD; ++i) {
        insert_args[i].threadnum = i;
        insert_args[i].nthread = N_INSERT_THREAD;
        insert_args[i].testopts = opts;
        insert_args[i].sharedopts = sharedopts;
        testutil_check(pthread_create(&insert_tid[i], NULL, thread_insert, &insert_args[i]));
    }

    for (i = 0; i < N_JOIN_THREAD; ++i) {
        join_args[i].threadnum = i;
        join_args[i].nthread = N_JOIN_THREAD;
        join_args[i].testopts = opts;
        join_args[i].sharedopts = sharedopts;
        testutil_check(pthread_create(&join_tid[i], NULL, thread_join, &join_args[i]));
    }

    /*
     * Wait for insert threads to finish. When they are done, signal join threads to complete.
     */
    for (i = 0; i < N_INSERT_THREAD; ++i)
        testutil_check(pthread_join(insert_tid[i], NULL));

    for (i = 0; i < N_JOIN_THREAD; ++i)
        join_args[i].done = 1;

    for (i = 0; i < N_JOIN_THREAD; ++i)
        testutil_check(pthread_join(join_tid[i], NULL));

    fprintf(stderr, "\n");
    for (i = 0; i < N_JOIN_THREAD; ++i) {
        fprintf(stderr, "  join thread %d did %d joins\n", i, join_args[i].joins);
    }
    for (i = 0; i < N_INSERT_THREAD; ++i)
        fprintf(stderr,
          "  insert thread %d did "
          "%d inserts, %d removes, %d notfound, %d rollbacks\n",
          i, insert_args[i].inserts, insert_args[i].removes, insert_args[i].notfounds,
          insert_args[i].rollbacks);

    testutil_check(session->drop(session, sharedopts->posturi, NULL));
    testutil_check(session->drop(session, sharedopts->baluri, NULL));
    testutil_check(session->drop(session, sharedopts->flaguri, NULL));
    testutil_check(session->drop(session, opts->uri, NULL));
    testutil_check(session->close(session, NULL));
}

static void *
thread_insert(void *arg)
{
    SHARED_OPTS *sharedopts;
    TEST_OPTS *opts;
    THREAD_ARGS *threadargs;
    WT_CURSOR *maincur;
    WT_RAND_STATE rnd;
    WT_SESSION *session;
    int bal, i, flag, key, post, ret;
    const char *extra = S1024;

    threadargs = (THREAD_ARGS *)arg;
    opts = threadargs->testopts;
    sharedopts = threadargs->sharedopts;
    __wt_random_init_seed(NULL, &rnd);

    testutil_check(opts->conn->open_session(opts->conn, NULL, NULL, &session));

    testutil_check(session->open_cursor(session, opts->uri, NULL, NULL, &maincur));

    for (i = 0; i < N_INSERT; i++) {
        /*
         * Insert threads may stomp on each other's records; that's okay.
         */
        key = (int)(__wt_random(&rnd) % N_RECORDS);
        maincur->set_key(maincur, key);
        if (sharedopts->remove)
            testutil_check(session->begin_transaction(session, "isolation=snapshot"));
        if (sharedopts->remove && __wt_random(&rnd) % 5 == 0 && maincur->search(maincur) == 0) {
            /*
             * Another thread can be removing at the same time.
             */
            ret = maincur->remove(maincur);
            testutil_assert(
              ret == 0 || (N_INSERT_THREAD > 1 && (ret == WT_NOTFOUND || ret == WT_ROLLBACK)));
            if (ret == 0)
                threadargs->removes++;
            else if (ret == WT_NOTFOUND)
                threadargs->notfounds++;
            else if (ret == WT_ROLLBACK)
                threadargs->rollbacks++;
        } else {
            if (__wt_random(&rnd) % 2 == 0)
                post = 54321;
            else
                post = i % 100000;
            if (__wt_random(&rnd) % 2 == 0) {
                bal = -100;
                flag = 1;
            } else {
                bal = 1 + (i % 1000) * 100;
                flag = 0;
            }
            maincur->set_value(maincur, post, bal, extra, flag, key);
            ret = maincur->insert(maincur);
            testutil_assert(ret == 0 || (N_INSERT_THREAD > 1 && ret == WT_ROLLBACK));
            testutil_check(maincur->reset(maincur));
            if (ret == 0)
                threadargs->inserts++;
            else if (ret == WT_ROLLBACK)
                threadargs->rollbacks++;
        }
        if (sharedopts->remove)
            testutil_check(session->commit_transaction(session, NULL));
        if (i % 1000 == 0 && i != 0) {
            if (i % 10000 == 0)
                fprintf(stderr, "*");
            else
                fprintf(stderr, ".");
        }
    }
    testutil_check(maincur->close(maincur));
    testutil_check(session->close(session, NULL));
    return (NULL);
}

static void *
thread_join(void *arg)
{
    SHARED_OPTS *sharedopts;
    TEST_OPTS *opts;
    THREAD_ARGS *threadargs;
    WT_CURSOR *balcur, *flagcur, *joincur, *postcur;
    WT_SESSION *session;
    int bal, flag, key, key2, post, ret;
    char cfg[128];
    char *extra;

    threadargs = (THREAD_ARGS *)arg;
    opts = threadargs->testopts;
    sharedopts = threadargs->sharedopts;

    testutil_check(opts->conn->open_session(opts->conn, NULL, NULL, &session));

    testutil_check(session->open_cursor(session, sharedopts->posturi, NULL, NULL, &postcur));
    testutil_check(session->open_cursor(session, sharedopts->baluri, NULL, NULL, &balcur));
    testutil_check(session->open_cursor(session, sharedopts->flaguri, NULL, NULL, &flagcur));

    for (threadargs->joins = 0; threadargs->done == 0; threadargs->joins++) {
        testutil_check(session->open_cursor(session, sharedopts->joinuri, NULL, NULL, &joincur));
        postcur->set_key(postcur, 54321);
        testutil_check(postcur->search(postcur));
        testutil_check(session->join(session, joincur, postcur, "compare=eq"));

        balcur->set_key(balcur, 0);
        testutil_check(balcur->search(balcur));
        if (sharedopts->bloom)
            testutil_check(
              __wt_snprintf(cfg, sizeof(cfg), "compare=lt,strategy=bloom,count=%d", N_RECORDS));
        else
            testutil_check(__wt_snprintf(cfg, sizeof(cfg), "compare=lt"));
        testutil_check(session->join(session, joincur, balcur, cfg));

        flagcur->set_key(flagcur, 0);
        testutil_check(flagcur->search(flagcur));
        if (sharedopts->bloom)
            testutil_check(
              __wt_snprintf(cfg, sizeof(cfg), "compare=eq,strategy=bloom,count=%d", N_RECORDS));
        else
            testutil_check(__wt_snprintf(cfg, sizeof(cfg), "compare=eq"));
        testutil_check(session->join(session, joincur, flagcur, cfg));

        /* Expect no values returned */
        ret = joincur->next(joincur);
        if (ret == 0) {
            /*
             * The values may already have been changed, but print them for informational purposes.
             */
            testutil_check(joincur->get_key(joincur, &key));
            testutil_check(joincur->get_value(joincur, &post, &bal, &extra, &flag, &key2));
            fprintf(stderr,
              "FAIL: iteration %d: "
              "key=%d/%d, postal_code=%d, balance=%d, flag=%d\n",
              threadargs->joins, key, key2, post, bal, flag);
            /* Save the results. */
            testutil_check(opts->conn->close(opts->conn, NULL));
            opts->conn = NULL;
            return (NULL);
        }
        testutil_assert(ret == WT_NOTFOUND);
        testutil_check(joincur->close(joincur));

        /*
         * Reset the cursors, potentially allowing the insert threads to proceed.
         */
        testutil_check(postcur->reset(postcur));
        testutil_check(balcur->reset(balcur));
        testutil_check(flagcur->reset(flagcur));
        if (threadargs->joins % 100 == 0)
            fprintf(stderr, "J");
    }
    testutil_check(postcur->close(postcur));
    testutil_check(balcur->close(balcur));
    testutil_check(flagcur->close(flagcur));
    testutil_check(session->close(session, NULL));
    return (NULL);
}
